Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge partition columns into scan statistics for data skipping #615

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

timsaucer
Copy link

@timsaucer timsaucer commented Dec 23, 2024

What changes are proposed in this pull request?

Currently the data skipping feature is comparing a predicate to the minValues and maxValues in the addition log. When the user provides a predicate that needs to match against a partition value, these values are not getting included in the test. Instead every log passes this filter. With this change, we take the values in the partitions and add them to the statistics record batch so that we can match against them to reduce the returned number of logs.

How was this change tested?

Tested against my current workflow in DataFusion where I monitor the number of record batches returned from a query with a predicate assigned.

I have expanded a unit test to include a partition based predicate.

@timsaucer timsaucer changed the title [WIP] Merge partition columns into scan statistics for data skipping Merge partition columns into scan statistics for data skipping Dec 23, 2024
@timsaucer timsaucer marked this pull request as ready for review December 23, 2024 19:25
Copy link

codecov bot commented Dec 28, 2024

Codecov Report

Attention: Patch coverage is 86.33540% with 22 lines in your changes missing coverage. Please review.

Project coverage is 83.47%. Comparing base (c3a868f) to head (bd1b814).

Files with missing lines Patch % Lines
kernel/src/scan/data_skipping.rs 83.72% 3 Missing and 18 partials ⚠️
kernel/src/table_changes/log_replay/tests.rs 96.87% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #615      +/-   ##
==========================================
+ Coverage   83.45%   83.47%   +0.02%     
==========================================
  Files          74       74              
  Lines       16877    17038     +161     
  Branches    16877    17038     +161     
==========================================
+ Hits        14084    14223     +139     
- Misses       2135     2138       +3     
- Partials      658      677      +19     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@timsaucer
Copy link
Author

Digging into the failures that are caused when trying to compile in the --no-default-features it looks like the approach I'm taking is an anti-pattern for the crate design. I'm trying to think of the best way to refactor it. I'm open to suggestion, though.

@timsaucer
Copy link
Author

I'm not sure the exact best way to do this, but my first inclination would be to do something along the lines of building an output struct of the partition values and then using a conditional expression to take the partition values if they exist or the stats values if not. But it looks like the expressions don't have conditionals, so that's not useful.

I'm going to think about it, but I don't know the best way forward right now.

@roeap
Copy link
Collaborator

roeap commented Dec 29, 2024

@timsaucer - thanks for contributing and you are correct, outside the default/arrow engine implementation, we want to make no assumption about the representation of data. I believe many things are on the right track - i.e. extending the stats with min/max values for the partition columns.

The general idea would likely be to create an expression that assigns constants values extracted from the file actions - i.e a Struct that assigns literals to the respective fields, and then let the engine evaluate that expression.

Not entirely sure, but there were some discussions on how we can tell the engine the best way to generate some data for us. IIRC, expressions might be the way to go, but @nicklan may have some more recent thoughts on this.

Does this already help a bit? If not, I could dig a bit more into the code and try to give some more concrete hints.

@timsaucer
Copy link
Author

Thank you. I appreciate the feedback. I’m going to think about it. I’ve got very limited time over the next week but I hope to get this resolved soon. For my use case, this will majorly impact how well the predicates work.

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general idea would likely be to create an expression that assigns constants values extracted from the file actions - i.e a Struct that assigns literals to the respective fields, and then let the engine evaluate that expression.

Not entirely sure, but there were some discussions on how we can tell the engine the best way to generate some data for us. IIRC, expressions might be the way to go, but @nicklan may have some more recent thoughts on this.

AFAIK, we have four challenges here:

  1. Extracting the requested partition values from the add.partitionValues string-string map.
  2. Parsing each string partition value into its proper type
  3. Assembling the partition values into a proper struct
  4. Teach trait DataSkippingPredicateEvaluator and/or the impl of that trait for DataSkippingPredicateCreator to consume that struct (more below)

I can think of two main ways to approach 1/2/3:

  • (a) Use table-level expressions. We don't currently have any kind of map extractor expression (@hntd187 did some exploring a while ago, see Map access for expressions #352), and we also don't have any from-string casting expressions. Once we had those two, it's a simple matter to emit the desired struct expression.
    • PRO: One and done -- a single "table-level" expression" applies to all data chunks we process
    • PRO: Engine (not kernel) does the hard work of map probing and from-string casting.
    • CON: Complexity. Two whole new kinds of expressions we need to define and support.
    • CON: Higher risk that engine's string parsing semantics do not match the Delta spec
  • (b) Use row visitor API to extract partition values and emit file-level expression literals. This is what the duckdb extension currently does (if you squint)
    • PRO: The row visitor API already provides map access, and PrimitiveType::parse_scalar already provides the needed from-string capability.
    • PRO: No worries about engine's string parsing semantics, kernel handles it
    • CON: Row visitor API will be inefficient for big tables with lots of file actions.
    • CON: Per-file expressions are more expensive to create and manage than per-table expressions. But this is less of a concern because other Delta features will anyway require per-file expression support (so why not use it here as well).

I favor per-file expressions leveraging the row visitor API, because it uses building blocks we already have. And kernel's mantra is "simplicity" and "don't pessimize" -- rather than "optimize" -- so it's ok if this is not the absolute most efficient approach possible.

my first inclination would be to do something along the lines of building an output struct of the partition values and then using a conditional expression to take the partition values if they exist or the stats values if not. But it looks like the expressions don't have conditionals, so that's not useful

This relates to 4/ from the list of challenges above. Fortunately, the set of partition columns is fixed for any table, so we don't need runtime conditional expressions -- instead, the data skipping expression generator can track the set of partition columns and conditionally emit stats- or partition-based data skipping expressions as appropriate.

One possibility is to change the data skipping predicate impl of get_min_stat and get_max_stat to conditionally return an expression over the partition value instead of the (non-existing) stat. See https://github.com/delta-io/delta-kernel-rs/blob/main/kernel/src/scan/data_skipping.rs#L170-L178. That would be simple, and works perfectly for gt/lt/ge/le, but is sub-optimal for eq/ne because the current skipping logic has to assume min != max. That approach would also just plain fail for null checking predicates, because there are no rowcount or nullcount stats for partition colums (instead, the partition value itself is either null or non-null).

We could improve the eq/ne case by making DataSkippingPredicateEvaluator::eval_eq directly aware of partition values, instead of relying on eval_partial_cmp. Similarly, we could improve the null check case by making DataSkippingPredicate::eval_is_null for DataSkippingPredicateCreator directly aware of partition values.

.map(|(_, v)| v.to_string())
})
})
.collect::<Vec<Option<String>>>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use Itertools::collect_vec to avoid the turbofish

Comment on lines +331 to +340
let mut kv =
keys.iter()
.zip(values.iter())
.filter_map(|(k, v)| match (k, v) {
(Some(k), Some(v)) => Some((k, v)),
_ => None,
});

kv.find(|(k, _)| *k == key.as_str())
.map(|(_, v)| v.to_string())
Copy link
Collaborator

@scovich scovich Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just:

Suggested change
let mut kv =
keys.iter()
.zip(values.iter())
.filter_map(|(k, v)| match (k, v) {
(Some(k), Some(v)) => Some((k, v)),
_ => None,
});
kv.find(|(k, _)| *k == key.as_str())
.map(|(_, v)| v.to_string())
keys
.iter()
.zip(values.iter())
.filter_map(|(k, v)| match (k, v) {
(Some(k), Some(v)) if *k == key.as_str() => Some(v.to_string()),
_ => None,
})
.last()

(have to return the last match because arrow and parquet maps are allowed to have duplicate keys, last key wins)

Comment on lines +389 to +392
let partitions_column = match partitions_batch.column_by_name("output") {
Some(c) => c,
None => return Ok(stats),
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
let partitions_column = match partitions_batch.column_by_name("output") {
Some(c) => c,
None => return Ok(stats),
};
let Some(partitions_column) = partitions_batch.column_by_name("output") else {
return Ok(stats);
};

Comment on lines +407 to +420
match field.name().as_str() {
"minValues" => columns.push(merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?),
"maxValues" => columns.push(merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?),
_ => {
columns.push(Arc::clone(stats_batch.column(idx)));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
match field.name().as_str() {
"minValues" => columns.push(merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?),
"maxValues" => columns.push(merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?),
_ => {
columns.push(Arc::clone(stats_batch.column(idx)));
}
let column = match field.name().as_str() {
"minValues" | "maxValues" => merge_partition_fields_into_stats(
stats_batch,
idx,
&partition_values,
)?,
_ => Arc::clone(stats_batch.column(idx)),
};
columns.push(column);

I don't understand what this code is doing tho? What should happen if a stat other than min or max is requested for a partition column?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Figured it out -- partition values only get merged in for min/max stats; other stats remain unchanged.

.column(idx)
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| Error::engine_data_type("minValues"))?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this method also get called for maxValues?

Comment on lines +374 to +375
Ok(Arc::new(StructArray::new(fields, arrays, nulls))
as Arc<(dyn arrow_array::Array + 'static)>)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the cast necessary? I thought Arc decayed?

Suggested change
Ok(Arc::new(StructArray::new(fields, arrays, nulls))
as Arc<(dyn arrow_array::Array + 'static)>)
let array = Arc::new(StructArray::new(fields, arrays, nulls));
Ok(array)

or, if the cast is unavoidable:

Suggested change
Ok(Arc::new(StructArray::new(fields, arrays, nulls))
as Arc<(dyn arrow_array::Array + 'static)>)
Ok(Arc::new(StructArray::new(fields, arrays, nulls)) as _)

@scovich
Copy link
Collaborator

scovich commented Dec 30, 2024

I can think of two main ways to approach 1/2/3:

  • (a) Use table-level expressions.
  • (b) Use row visitor API to extract partition values and emit file-level expression literals.

I just noticed that #607 is already starting to implement (b), tho I suppose it could also be adapted to do (a) instead (it adds supports for both table-level and file-level expressions).

@timsaucer
Copy link
Author

Thank you @scovich . This is incredibly helpful. I'll try to start digging into the row visitor API and how to apply it in this case. I'm moving this PR to draft since it will obviously need a fairly big change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants